package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.BaseAppV2;
import com.atguigu.gmall.realtime.base.OrderDetail;
import com.atguigu.gmall.realtime.base.OrderInfo;
import com.atguigu.gmall.realtime.base.OrderWide;
import com.atguigu.gmall.realtime.common.Constant;
import com.atguigu.gmall.realtime.function.DimAsyncFunction;
import com.atguigu.gmall.realtime.util.DimUtil;
import com.atguigu.gmall.realtime.util.FlinkSinkUtil;
import com.atguigu.gmall.realtime.util.FlinkSourceUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.sql.Connection;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author liuyun
 * @title: DwmOrderWideApp
 * @projectName gmal0924
 * @description: TODO
 * @date 2022-3-21 10:49
 */
public class DwmOrderWideAppCacheAsync extends BaseAppV2 {
    public static void main(String[] args) {
        new DwmOrderWideAppCacheAsync().init(3003, 1,
                "DwmOrderWideAppCacheAsync",
                "DwmOrderWideAppCacheAsync",
                Constant.TOPIC_DWD_ORDER_INFO,
                Constant.TOPIC_DWD_ORDER_DETAIL);
    }

    @Override
    protected void handle(StreamExecutionEnvironment env,
                          HashMap<String, DataStreamSource<String>> topicStreamMap) {
//        topicStreamMap.get(Constant.TOPIC_DWD_ORDER_INFO).print("info");
//        topicStreamMap.get(Constant.TOPIC_DWD_ORDER_DETAIL).print("detail");
        //1、事实表join,得到一个新的流（事实表）
        SingleOutputStreamOperator<OrderWide> orderWideWithoutDimStream = factsJoin(topicStreamMap);
        //orderWideWithoutDimStream.print();
        //2、事实表和维度表
        SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator = factJoinDim(orderWideWithoutDimStream);
        //3、把数据写入到kafka中
        writeToKafka(orderWideSingleOutputStreamOperator);
        //orderWideSingleOutputStreamOperator.print();

    }

    private void writeToKafka(SingleOutputStreamOperator<OrderWide> orderWideSingleOutputStreamOperator) {
        orderWideSingleOutputStreamOperator
                .map(JSON::toJSONString)
                .addSink(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWM_ORDER_WIDE));
    }


    private SingleOutputStreamOperator<OrderWide> factJoinDim(SingleOutputStreamOperator<OrderWide> orderWideWithoutDimStream) {
        return AsyncDataStream.unorderedWait(
                orderWideWithoutDimStream,
                new DimAsyncFunction<OrderWide>() {
                    @Override
                    public void addDim(Jedis redisClient,
                                       Connection phoenixConn,
                                       OrderWide orderWide,
                                       ResultFuture<OrderWide> resultFuture) throws Exception {
                        // 补齐维度
                        // 读取维度数据, 6张表
                        // 1.补齐user_info
                        JSONObject userInfo = DimUtil.readDim(redisClient, phoenixConn, "dim_user_info", orderWide.getUser_id());
                        orderWide.setUser_gender(userInfo.getString("GENDER"));
                        orderWide.calcuUserAge(userInfo.getString("BIRTHDAY"));

                        // 2. 补充省份
                        JSONObject baseProvince = DimUtil.readDim(redisClient, phoenixConn, "dim_base_province", orderWide.getProvince_id());
                        orderWide.setProvince_name(baseProvince.getString("NAME"));
                        orderWide.setProvince_3166_2_code(baseProvince.getString("ISO_3166_2"));
                        orderWide.setProvince_area_code(baseProvince.getString("AREA_CODE"));
                        orderWide.setProvince_iso_code(baseProvince.getString("ISO_CODE"));

                        // 3. sku_info
                        JSONObject skuInfo = DimUtil.readDim(redisClient, phoenixConn, "dim_sku_info", orderWide.getSku_id());
                        orderWide.setSku_name(skuInfo.getString("SKU_NAME"));
                        orderWide.setSku_price(skuInfo.getBigDecimal("PRICE"));

                        orderWide.setSpu_id(skuInfo.getLong("SPU_ID"));
                        orderWide.setTm_id(skuInfo.getLong("TM_ID"));
                        orderWide.setCategory3_id(skuInfo.getLong("CATEGORY3_ID"));

                        // 4. spu_info
                        JSONObject spuInfo = DimUtil.readDim(redisClient, phoenixConn, "dim_spu_info", orderWide.getSpu_id());
                        orderWide.setSpu_name(spuInfo.getString("SPU_NAME"));
                        // 5. base_trademark
                        JSONObject baseTrademark = DimUtil.readDim(redisClient, phoenixConn, "dim_base_trademark", orderWide.getTm_id());
                        orderWide.setTm_name(baseTrademark.getString("TM_NAME"));


                        // 6. c3
                        JSONObject c3 = DimUtil.readDim(redisClient, phoenixConn, "dim_base_category3", orderWide.getCategory3_id());
                        orderWide.setCategory3_name(c3.getString("NAME"));

                        resultFuture.complete(Collections.singletonList(orderWide));
                    }
                },
                60,
                TimeUnit.SECONDS
        );

    }

    private SingleOutputStreamOperator<OrderWide> factsJoin(HashMap<String, DataStreamSource<String>> topicStreamMap) {
        /*
         *  双流join, 有两种:
         *  1. 窗口join
         *  2. interval join
         *  a: keyBy之后
         *  b: 只支持事件时间
         */
        KeyedStream<OrderInfo, Long> orderInfoStream = topicStreamMap
                .get(Constant.TOPIC_DWD_ORDER_INFO)
                .map(info -> JSON.parseObject(info, OrderInfo.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((info, ts) -> info.getCreate_ts())
                )
                .keyBy(OrderInfo::getId);  // 订单id


        KeyedStream<OrderDetail, Long> orderDetailStream = topicStreamMap
                .get(Constant.TOPIC_DWD_ORDER_DETAIL)
                .map(info -> JSON.parseObject(info, OrderDetail.class))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((detail, ts) -> detail.getCreate_ts())
                )
                .keyBy(OrderDetail::getOrder_id);  // 选择订单 id


        return orderInfoStream
                .intervalJoin(orderDetailStream)
                .between(Time.seconds(-10), Time.seconds(10))
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo orderInfo,
                                               OrderDetail orderDetail,
                                               Context ctx,
                                               Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(orderInfo, orderDetail));

                    }
                });


    }

}
